package com.joysuccess.consumer.mina.service;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * Bacnet协议消费主题
 *
 * @author zhangqing
 * @date 2019年05月04日
 */
@Slf4j
@Component
public class MinaConsumer {

    public final static String DLJK_DATA_TOPIC_0001="DLJK-COL-MSG-DATA-0001";

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 消费bacnet产生的数据，发送到Redis和ES
     * @param record
     */
    @KafkaListener(topics = DLJK_DATA_TOPIC_0001, groupId = "MinaConsumerGroup")
    public void onMessageRedisString(ConsumerRecord<?, String> record) {
        String value = record.value();
        JSONObject jsonObject = JSONObject.parseObject(value);

        String objectName = (String)jsonObject.get("objectName");
        String presentValue = (String)jsonObject.get("oidList");
        String assetId = (String)jsonObject.get("assetId");

        Map<String, String> map = new HashMap<>();
        JSONObject oidJsonObject = JSONObject.parseObject((String)jsonObject.get("oidList"));
        for (Map.Entry entry : oidJsonObject.entrySet()) {
            map.put(entry.getKey().toString(), entry.getValue().toString());
        }

//        redisTemplate.opsForHash().put(assetId, objectName, presentValue);
        redisTemplate.opsForHash().putAll(assetId, map);
    }
}
